/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.type.apply.batch.impl;

import java.util.HashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.unidata.mdm.core.type.keys.OriginKey;
import org.unidata.mdm.data.context.DataContextFlags;
import org.unidata.mdm.data.context.RestoreRecordRequestContext;
import org.unidata.mdm.data.dto.RestoreRecordDTO;
import org.unidata.mdm.data.po.data.RecordVistoryPO;
import org.unidata.mdm.data.service.segments.records.batch.RecordsRestoreStartExecutor;
import org.unidata.mdm.data.type.apply.RecordRestoreChangeSet;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RecordOriginKey;
import org.unidata.mdm.system.type.batch.BatchIterator;
import org.unidata.mdm.system.type.pipeline.fragment.FragmentId;
import org.unidata.mdm.system.type.pipeline.fragment.InputFragment;

/**
 * @author Mikhail Mikhailov
 * Simple record batch set accumulator.
 */
public class RecordRestoreBatchSetAccumulator
    extends AbstractRecordBatchSetAccumulator<RestoreRecordRequestContext, RestoreRecordDTO, RecordRestoreBatchSetAccumulator>
    implements InputFragment<RecordRestoreBatchSetAccumulator> {
    /**
     * Fragment ID for convenience.
     */
    public static final FragmentId<RecordRestoreBatchSetAccumulator> ID
        = new FragmentId<>("RECORD_RESTORE_BATCH_SET");
    /**
     * The run stats.
     */
    private final RecordRestoreBatchSetStatistics statistics;
    /**
     * Origin ids.
     */
    private final Map<String, Pair<OriginKey, Integer>> ids;
    /**
     * Constructor.
     * @param commitSize chunk size
     */
    public RecordRestoreBatchSetAccumulator(int commitSize) {
        super(commitSize);
        this.statistics = new RecordRestoreBatchSetStatistics();
        this.ids = new HashMap<>(commitSize);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FragmentId<RecordRestoreBatchSetAccumulator> fragmentId() {
        return ID;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String getStartTypeId() {
        return RecordsRestoreStartExecutor.SEGMENT_ID;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void discharge() {
        super.discharge();
        statistics.reset();
        ids.clear();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public BatchIterator<RestoreRecordRequestContext> iterator() {
        return new RecordRestoreBatchIterator();
    }
    /**
     * Ugly stuff, made public because of containment relations.
     * Accumulates objects, created during etalon upsert.
     * @param ctx the batch set
     */
    @Override
    public void accumulate(RestoreRecordRequestContext ctx) {

        RecordRestoreChangeSet batchSet = ctx.changeSet();
        accumulateEtalonUpdate(batchSet.getEtalonRecordUpdatePO());
        accumulateOriginUpdates(batchSet.getOriginRecordUpdatePOs());
        accumulateVistoryUpdates(ctx, batchSet);

        if (CollectionUtils.isNotEmpty(batchSet.getIndexRequestContexts())) {
            indexUpdates.addAll(batchSet.getIndexRequestContexts());
        }

        batchSet.clear();
    }

    private void accumulateVistoryUpdates(RestoreRecordRequestContext ctx, RecordRestoreChangeSet batchSet) {

        // COPY support, revision must be known beforehand.
        // Restore may supply several vistory objects for the same origin key
        final RecordKeys keys = ctx.keys();
        for (RecordVistoryPO v : batchSet.getOriginsVistoryRecordPOs()) {

            Pair<OriginKey, Integer> cachedKeys = ids.get(v.getOriginId());
            if (Objects.isNull(cachedKeys)) {

                RecordOriginKey rok = keys.findByOriginId(v.getOriginId());

                Objects.requireNonNull(rok, "Record origin key is null for supplied ID!");

                cachedKeys = Pair.of(rok, rok.getRevision());
                ids.put(v.getOriginId(), cachedKeys);
            }

            int currentRevision = cachedKeys.getValue();
            v.setRevision(++currentRevision);

            accumulateVistory(v);

            cachedKeys.setValue(currentRevision);
        }
    }
    /**
     * {@inheritDoc}
     */
    @SuppressWarnings("unchecked")
    @Override
    public RecordRestoreBatchSetStatistics statistics() {
        return statistics;
    }
    /**
     * @author Mikhail Mikhailov
     * Simple batch iterator.
     */
    private class RecordRestoreBatchIterator implements BatchIterator<RestoreRecordRequestContext> {
        /**
         * List iterator.
         */
        private ListIterator<RestoreRecordRequestContext> i = workingCopy.listIterator();
        /**
         * Current entry.
         */
        private RestoreRecordRequestContext current = null;
        /**
         * Constructor.
         */
        public RecordRestoreBatchIterator() {
            super();
        }
        /**
         * If there are more elements to iterate.
         * @return true, if so, false otherwise
         */
        @Override
        public boolean hasNext() {

            boolean hasNext = i.hasNext();
            if (!hasNext && current != null) {
                accumulate(current);
            }

            return hasNext;
        }
        /**
         * Next context for origin upsert
         * @return next context
         */
        @Override
        public RestoreRecordRequestContext next() {

            RestoreRecordRequestContext next = i.next();
            if (current != null) {
                accumulate(current);
            }

            init(next);

            current = next;
            return next;
        }
        /**
         * Removes current element.
         */
        @Override
        public void remove() {
            i.remove();
            current = null;
        }

        /**
         * Does some preprocessing.
         * @param ctx the upsert context
         */
        private void init(RestoreRecordRequestContext ctx) {

            // Already initialized
            if (Objects.isNull(ctx.changeSet())) {
                ctx.changeSet(new RecordRestoreChangeSet());
            }

            // Ensure the flag is set
            ctx.setFlag(DataContextFlags.FLAG_BATCH_OPERATION, true);
        }
    }
}
